from matplotlib import pyplot as plt
import numpy as np
from figure import _plot_eeg, plot_eeg
from eeg_v2 import EEG
from utils import chunks, baseline_calc, butter_bandpass_filter
plt.rcParams["figure.figsize"] = [10, 5]
# Reading raw data
path_edf="./edf/1578_alice/edf/A0001578.edf"
path_stage="./edf/1578_alice/csv/STAGE.csv"
eeg = EEG(path_edf=path_edf, path_stage=path_stage)
def get_chunks(sig):
i = 0
for task in sig.tasks:
for c in chunks(task, 15):
i+=1
yield i, c
_chunks = get_chunks(eeg)
for i in range(16):
_, chunk = next(_chunks)
print(_)
print(np.min(chunk),np.max(chunk))
# Preprocessing
sample_raw = eeg.raw.copy().crop(np.min(chunk),np.max(chunk),include_tmax=False)
sample_raw_eog = eeg.eog_channels.copy().crop(np.min(chunk),np.max(chunk),include_tmax=False)
# Baseline
sample_raw_baseline = sample_raw.copy()
sample_raw_baseline = sample_raw_baseline.apply_function(baseline_calc)
# Bandpass
sample_raw_bandpass = sample_raw_baseline.copy()
sample_raw_bandpass = sample_raw_bandpass.apply_function(butter_bandpass_filter)
# Raw data
_raw = np.zeros([12, 7500])
_raw[0:10,:] = sample_raw.get_data()
_raw[10:12,:] = sample_raw_eog.get_data()
# Bandpass data
_bandpass = np.zeros([12, 7500])
_bandpass[0:10,:] = sample_raw_bandpass.get_data()
_bandpass[10:12,:] = sample_raw_eog.get_data()
plt.rcParams["figure.figsize"] = [10,10]
_plot_eeg(_raw*1e6, fs=500, ch_names = [*sample_raw.ch_names, *sample_raw_eog.ch_names]
, markers=[
{"t":1,"c":"black"},
{"t":1.7,"c":"black"},
{"t":2.45,"c":"black"},
{"t":3.6,"c":"black"},
{"t":6,"c":"black"},
{"t":6.9,"c":"black"},
{"t":8.6,"c":"black"},
{"t":12.5,"c":"black"},]
,dpi = 1200
);
_plot_eeg(
_bandpass*1e6,
fs=500,
ch_names = [*sample_raw.ch_names, *sample_raw_eog.ch_names],
markers=[
{"t":1,"c":"black"},
{"t":1.7,"c":"black"},
{"t":2.45,"c":"black"},
{"t":3.6,"c":"black"},
{"t":6,"c":"black"},
{"t":6.9,"c":"black"},
{"t":8.6,"c":"black"},
{"t":12.5,"c":"black"},],
dpi = 1200
);
import joblib
from mne.preprocessing import ICA
clf = joblib.load("./models/eog_classifier_v2.joblib")
def _get_ica_map(ica, components=None):
"""Get ICA topomap for components"""
fast_dot = np.dot
if components is None:
components = list(range(ica.n_components_))
maps = fast_dot(ica.mixing_matrix_[:, components].T,
ica.pca_components_[:ica.n_components_])
return maps
sample_raw_train = sample_raw_bandpass.copy()
sample_raw_corrected = sample_raw_bandpass.copy()
# Fitting ICA
ica = ICA(method="extended-infomax", random_state=1)
ica.fit(sample_raw_corrected)
maps = _get_ica_map(ica).T
scalings = np.linalg.norm(maps, axis=0)
maps /= scalings[None, :]
X = maps.T
# Predict EOG
eog_preds = clf.predict(X)
list_of_eog = np.where(eog_preds == 1)[0]
list_of_eog
ica.plot_sources(inst=sample_raw_train)
ica.plot_components(inst=sample_raw_train)
ica.exclude = list_of_eog
ica.apply(sample_raw_corrected)
_corrected = np.zeros([12, 7500])
_corrected[0:10,:] = ica.get_sources(inst=sample_raw_train).get_data()
_corrected[10:12,:] = sample_raw_eog.get_data()
_plot_eeg(
_corrected*1e6,
fs=500,
ch_names = [*ica.get_sources(inst=sample_raw_train).ch_names, *sample_raw_eog.ch_names],
markers=[
{"t":1,"c":"black"},
{"t":1.7,"c":"black"},
{"t":2.45,"c":"black"},
{"t":3.6,"c":"black"},
{"t":6,"c":"black"},
{"t":6.9,"c":"black"},
{"t":8.6,"c":"black"},
{"t":12.5,"c":"black"},],
dpi = 1200
);
def pipeline():